home *** CD-ROM | disk | FTP | other *** search
/ Languguage OS 2 / Languguage OS II Version 10-94 (Knowledge Media)(1994).ISO / gnu / ms-0_06.lha / bms-0.06 / work.c < prev    next >
C/C++ Source or Header  |  1993-08-06  |  16KB  |  641 lines

  1. /* work.c -- MandelSpawn work distribution */
  2.  
  3. /* Queue requests for calculation, possibly from different clients */
  4. /* (such as multiple windows), distribute them among the computation */
  5. /* servers and return the results to the client that requested them. */
  6.  
  7. /* See work.h for interface definition. */
  8.  
  9. /*  This file is part of MandelSpawn, a parallel Mandelbrot program.
  10.  
  11.     Copyright (C) 1990, 1991 Andreas Gustafsson
  12.  
  13.     MandelSpawn is free software; you can redistribute it and/or modify
  14.     it under the terms of the GNU General Public License, version 1,
  15.     as published by the Free Software Foundation.
  16.  
  17.     MandelSpawn is distributed in the hope that it will be useful,
  18.     but WITHOUT ANY WARRANTY; without even the implied warranty of
  19.     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  20.     GNU General Public License for more details.
  21.  
  22.     You should have received a copy of the GNU General Public License,
  23.     version 1, along with this program; if not, write to the Free 
  24.     Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  25. */
  26.  
  27.  
  28. #include <stdio.h>
  29. #include <string.h>
  30. #include <errno.h>    /* pre-X11R4 systems need this for EWOULDBLOCK */
  31. #include "work.h"
  32. #include "ms_ipc.h"
  33.  
  34. typedef struct slave
  35. { char *name_string;        /* machine name of the slave */
  36.   struct sockaddr_in name;     /* network address of the slave */
  37.   int has_timeout;         /* the timeout has been initialized */
  38.   unsigned timeout;        /* timeout in milliseconds */
  39. #ifndef OLD_TIMEOUT
  40.   long timeout_at;        /* when to timeout (seconds since epoc) */
  41. #else
  42.   char *timer_id;         /* timeout id (for removing the timeout) */
  43. #endif
  44.   unsigned int n_timeouts;     /* how many times the slave has timed out */
  45.   unsigned int n_packets;    /* number of packets that have arrived */
  46.   unsigned int n_late_packets;    /* number of packets that arrived too late */
  47.   unsigned long mi_count;     /* number of iterations done by this slave */
  48.   unsigned int no;        /* slave serial number */
  49.   struct wf_state *backptr;    /* back pointer to the wf_state */
  50.   int disabled;            /* slave disabled due to error */
  51. } slave;
  52.  
  53. typedef struct chunk
  54. { struct chunk *next;
  55.   struct chunk *prev;
  56.   int drawn;            /* true if at least one reply has arrived */
  57.   char *client;         /* pointer to widget owning this chunk */
  58.   unsigned int no;         /* serial number within current sequence */
  59.   char *client_data;        /* client data (unknown size) */
  60.   char *slave_data;        /* slave data */
  61.   unsigned int slave_datalen;    /* length of slave data */
  62. } chunk;
  63.  
  64. struct wf_state
  65. { int my_socket;          /* socket for communicating with slaves */
  66.   int sequence;            /* current sequence number */
  67.   int pid;            /* pid of this process */
  68.   unsigned n_slaves;         /* number of slaves */
  69.   struct slave **slaves;    /* array of pointers to slave descriptors */
  70.   int n_chunks;            /* number of chunks in the active sequence */
  71.   unsigned int max_chunks;     /* current size of chunk index */
  72.   struct chunk **chunks;     /* pointer to the chunk index */
  73.   struct chunk to_draw;     /* head of queue of chunks to be drawn */
  74.   struct chunk drawn;        /* head of queue of chunks already drawn */
  75. };
  76.  
  77. /* forward refs */
  78. static void handle_reply_msg(), whip_slave(), timeout_set(), timeout_unset();
  79.  
  80. /* names of files containing server hostnames */
  81. #define PERSONAL_SLAVEFILE ".mslaves" 
  82. #ifndef PUBLIC_SLAVEFILE
  83. #define PUBLIC_SLAVEFILE "/usr/local/etc/mslaves"
  84. #endif
  85.  
  86. #define INITIAL_CHUNKS 1024
  87.  
  88. #define MAX_WORKPACKET_SIZE 64
  89. #define DATAGRAM_BYTES 1200
  90.  
  91. char *getenv();
  92. struct hostent *gethostbyname();
  93.  
  94. /* Make a copy of a string in wf_alloc'ed memory (strdup() lookalike) */
  95.  
  96. static char *
  97. my_strdup(str) 
  98.      char *str;
  99. { unsigned int length=strlen(str);
  100.   register char *r = (char *) wf_alloc(length+1);
  101.   if(length>0)
  102.     bcopy (str, r, (int) length);
  103.   r[length] = '\0';
  104.   return(r);
  105. }
  106.  
  107.  
  108. /* Get the address of a host given either the host name or the address */
  109. /* in nnn.nnn.nnn.nnn notation */
  110.  
  111. static struct in_addr *
  112. gethostaddrbywhatever(p)
  113.      char *p;
  114. { struct hostent *hp;
  115.  
  116.   /* statically allocated return value buffer */
  117.   /* (this is ugly, but gethostbyname() is no better) */
  118.   static struct in_addr nobyinaddr;
  119.  
  120.   if ((hp=gethostbyname(p)) != NULL)
  121.   { return((struct in_addr *) (hp->h_addr));
  122.   }
  123.   else
  124.   { /* try it as a number nnn.nnn.nnn.nnn */
  125.     if ((nobyinaddr.s_addr=inet_addr(p)) == -1)
  126.     { return(0);
  127.     }
  128.     return(&nobyinaddr);
  129.   }
  130. }
  131.  
  132.  
  133. /* Null-terminate the current field in a whitespace-separated list */
  134. /* and return a pointer to the next field or NULL if there is no */
  135. /* next field.  Text after a hash sign is taken as comment and ignored */
  136.  
  137. static char *
  138. next_field(p) char *p;
  139. { while(1)
  140.   { register char c = *p;
  141.     switch(c)
  142.     {
  143.     case ' ':
  144.     case '\t':
  145.       *p = '\0';
  146.       while(1)
  147.       { int c = *++p;
  148.     if(c == '\n' || c == '\0') return(0);
  149.     if(c != ' ' && c != '\t') break;
  150.       }
  151.       return(p);
  152.     case '\0':
  153.       return(0);
  154.     case '\n':
  155.     case '#':
  156.       *p = '\0';
  157.       return(0);
  158.     default:
  159.       break;
  160.     }
  161.     p++;
  162.   }
  163. }
  164.  
  165.  
  166. wf_state *
  167. wf_init(timeout) unsigned timeout;
  168. { FILE *f;
  169.   int i;
  170.   char *filename, *home;
  171.   unsigned int size=16; /* initial size of slave table */
  172.   char buf[256];
  173.   wf_state *wf = (wf_state *) wf_alloc(sizeof(wf_state));
  174.  
  175.   /* general initialization: */
  176.   
  177.   /* set up the chunk index */
  178.   wf->max_chunks=INITIAL_CHUNKS;
  179.   wf->n_chunks=0;
  180.   wf->chunks=(chunk **) wf_alloc(wf->max_chunks * sizeof(chunk *));
  181.   
  182.   /* make a socket for communicating with the slaves */
  183.   if((wf->my_socket=
  184.       socket(AF_INET, SOCK_DGRAM, 0)) < 0) 
  185.     wf_error("socket");
  186.   
  187.   /* make sure we don't block reading the socket */  
  188.   if(fcntl(wf->my_socket, F_SETFL, FNDELAY) == -1)
  189.     wf_error("unblocking socket");
  190.  
  191.   /* set up the chunk queues */
  192.   wf->to_draw.prev=wf->to_draw.next= &wf->to_draw;
  193.   wf->drawn.prev=wf->drawn.next= &wf->drawn;
  194.  
  195.   wf->pid=getpid();
  196.   wf->sequence=0;
  197.   
  198.   /* .mslaves file stuff */
  199.   wf->slaves=(slave **) wf_alloc(size*sizeof(slave *));
  200.   home=getenv("HOME");
  201.   if(!home)
  202.     wf_error("HOME not set");
  203.   filename=wf_alloc((unsigned)(strlen(home)+1+strlen(PERSONAL_SLAVEFILE)+1));
  204.   strcpy(filename, home); 
  205.   strcat(filename, "/");
  206.   strcat(filename, PERSONAL_SLAVEFILE);
  207.   f=fopen(filename, "r");
  208.   if(!f)
  209.   { f=fopen(PUBLIC_SLAVEFILE, "r");
  210.   }
  211.   if(!f)
  212.   { wf_error("Could not find .mslaves file");
  213.   }
  214.   i=0;
  215.   while(1)
  216.   { char *p; /* points to current field in .mslaves line */
  217.     char *q; /* points to next field in .mslaves line */
  218.     slave *s;
  219.     struct in_addr *ina;
  220.     unsigned port;
  221.  
  222.     if(!fgets(buf, sizeof(buf), f))
  223.       break;
  224.     if(buf[0]=='\n' || buf[0]=='#')
  225.       continue;
  226.     p=buf;
  227.     q=next_field(p);
  228.     ina=gethostaddrbywhatever(p);
  229.  
  230.     if(ina==0)
  231.     { static char warn[] = "unknown host in .mslaves, ignored: ";
  232.       /* trying to keep up with GNU coding standards... */
  233.       char *msg = wf_alloc(sizeof(warn)+strlen(p));
  234.       strcpy(msg, warn);
  235.       strcat(msg, p);
  236.       wf_warn(msg);
  237.       wf_free(msg);
  238.       continue;
  239.     }
  240.  
  241.     p=q;
  242.     port=DEFAULT_PORT;
  243.     if(p) /* there is a "port" field */
  244.     { q=next_field(p);
  245.       port=(unsigned) atoi(p);
  246.       if(port==0) /* probably not an integer, and port 0 is bad anyway */
  247.       {    static char warn[] = "bad port field in .mslaves, machine ignored: ";
  248.     char *msg = wf_alloc(sizeof(warn)+strlen(p));
  249.     strcpy(msg, warn);
  250.     strcat(msg, p);
  251.     wf_warn(msg);
  252.     wf_free(msg);
  253.     continue;
  254.       }
  255.     }
  256.  
  257.     if(q)
  258.     { wf_warn("trailing junk in .mslaves");
  259.     }
  260.  
  261.     if(i>=size) 
  262.     { size *= 2;
  263.       wf->slaves=
  264.     (slave **) wf_realloc((char *) wf->slaves, size*sizeof(slave *));
  265.     }
  266.     s=(slave *) wf_alloc(sizeof(slave));  
  267.     bcopy((char *) ina, (char *) &s->name.sin_addr, sizeof(struct in_addr));
  268.     s->name.sin_family = AF_INET;
  269.     s->name.sin_port = htons(port);
  270.     
  271.     s->name_string = my_strdup(buf);
  272.     s->mi_count = 0L;
  273.     s->has_timeout = 0;
  274.     s->n_timeouts = s->n_packets = s->n_late_packets = 0;
  275.     s->no = i;
  276.     s->backptr = wf;
  277.     s->disabled = 0;
  278.     s->timeout = timeout;
  279.     wf->slaves[i] = s;
  280.     i++;
  281.   }
  282.   wf->n_slaves = i;
  283.   return(wf);
  284. }
  285.  
  286.  
  287. /* Delete the chunk "c" from anywhere in the queue */
  288. /* Don't deallocate the chunk yet because duplicated messages may still */
  289. /* refer to it */
  290.  
  291. static void
  292. queue_delete(c)
  293.      chunk *c;
  294. { c->next->prev=c->prev;
  295.   c->prev->next=c->next;
  296.   c->next=c->prev=NULL; /* just for easier debugging */
  297. }
  298.  
  299.  
  300. /* Add a chunk to the head of the queue */
  301.  
  302. static void
  303. queue_add(q, c) chunk *q; chunk *c;
  304. { c->prev=q->prev;
  305.   c->next=q;
  306.   q->prev->next=c;
  307.   q->prev=c;
  308. }
  309.  
  310.  
  311. /* Test the emptiness of a chunk queue */
  312.  
  313. static int
  314. queue_empty(q)
  315.      chunk *q;
  316. { return(q->next==q && q->prev==q);
  317. }
  318.  
  319.  
  320. /* Return the head of a chunk queue */
  321.  
  322. static chunk *
  323. queue_head(q)
  324.      chunk *q;
  325. { return(q->next);
  326. }
  327.  
  328.  
  329. void wf_handle_socket_input(wf, client_data)
  330.      wf_state *wf; char *client_data;
  331. { Message msg;
  332.   int fromlen;
  333.   /* This previously used read(), but some non-BSD TCP/IP implementations */
  334.   /* don't allow it to be used with connectionless sockets.  Also, */
  335.   /* while the Sun implementation does allow for a null pointer */
  336.   /* for the "from" argument in recvfrom(), the "fromlen" argument */
  337.   /* may not be a null pointer. */
  338.   if(recvfrom(wf->my_socket, (char *) &msg, sizeof(msg),
  339.           0, (struct sockaddr *) 0, &fromlen) == -1)
  340.  
  341.   {
  342. #ifdef XMS
  343. #ifndef R4
  344.     /* If you get "operation would block" errors, remove the #ifdef and */
  345.     /* #ifndef around this code, and send me (gson@niksula.hut.fi) */
  346.     /* a bug report mentioning your Xt library vendor and version. */
  347.     /* Does anyone have a clue why lots of people used to get these errors */
  348.     /* with X11R3? */
  349.     if(errno != EWOULDBLOCK)
  350. #endif
  351. #endif
  352.     { perror("read");
  353.       wf_error("slave input socket read");
  354.     }
  355.   }
  356.   else
  357.   { handle_reply_msg(wf, &msg);
  358.   }
  359. }
  360.  
  361.  
  362.  
  363. void wf_timed_out(client_data)
  364.   char *client_data;
  365. { slave *s=(slave *) client_data;
  366.   s->has_timeout=0;
  367.   s->n_timeouts++;
  368.   whip_slave(s->backptr, s);
  369. }
  370.  
  371. /* Set a timeout for a slave */
  372. /* This is done when the slave is whipped */
  373.  
  374. static void
  375. timeout_set(s)
  376.      slave *s;
  377. {
  378. #ifndef OLD_TIMEOUT
  379.   s->timeout_at = time((long *)0) + s->timeout / 1000;
  380. #else
  381.   /* first make sure there isn't a timeout already */
  382.   timeout_unset(s);
  383.   s->timer_id=wf_add_timeout(s->timeout, (char *) s);
  384. #endif
  385.   s->has_timeout=1;
  386. }
  387.  
  388.  
  389. /* Remove a timeout */
  390.  
  391. static void 
  392. timeout_unset(s)
  393.      slave *s;
  394. {
  395. #ifdef OLD_TIMEOUT  
  396.   if(s->has_timeout)
  397.     wf_remove_timeout(s->timer_id);
  398. #endif
  399.   s->has_timeout=0;
  400. }
  401.  
  402.  
  403. /* Stop the slaves */
  404.  
  405. static void
  406. stop_slaves(wf)
  407.     wf_state *wf;
  408. { int i;
  409.   for(i=0; i<wf->n_chunks; i++)
  410.   { chunk *c = wf->chunks[i];
  411.     wf_free((char *) (c->client_data));
  412.     wf_free((char *) (c->slave_data));
  413.     wf_free((char *) c);
  414.   }
  415.   /* don't shrink the chunk index array; we probably need it again */
  416.   wf->n_chunks=0;
  417.   wf->sequence++;
  418.   wf->drawn.prev=wf->drawn.next= &wf->drawn;
  419. }
  420.  
  421.  
  422. /* Put the specified slave to work, or stop all slaves if all work is done */
  423.  
  424. static void 
  425. whip_slave(wf, s)
  426.   wf_state *wf;
  427.   slave *s;
  428. { struct
  429.   { WhipMessage m;
  430.     char data[MAX_WORKPACKET_SIZE];
  431.   } mm; /* buffer for building the message to send */
  432.   chunk *c;
  433.   if(s->disabled)
  434.     return;
  435.   if(queue_empty(&wf->to_draw))
  436.   { stop_slaves(wf);
  437.   }
  438.   else /* queue not empty */
  439.   { /* MsWidget msw; */
  440.     c=queue_head(&wf->to_draw);
  441.     /* msw=c->wid; */
  442.     mm.m.header.magic=htons(MAGIC);
  443.     mm.m.header.type=htons(WHIP_MESSAGE);
  444.     mm.m.header.version=htons(VERSION);
  445.     mm.m.header.format=htons(DATA_FORMAT);
  446.     mm.m.id.pid=wf->pid;
  447.     mm.m.id.seq=wf->sequence;
  448.     mm.m.id.slave_no=s->no;
  449.     mm.m.id.chunk_no=c->no;
  450.  
  451.     if(c->slave_datalen > MAX_WORKPACKET_SIZE)
  452.       wf_error("work packet too large");
  453.     bcopy(c->slave_data, mm.m.data, c->slave_datalen);
  454.  
  455.     if(sendto(wf->my_socket, 
  456.           (char *)&mm, sizeof(mm.m)+c->slave_datalen,
  457.           0, (struct sockaddr *) &s->name,
  458.           (int) sizeof(s->name)) == -1)
  459.     { wf_warn("error sending datagram, use of affected server disabled");
  460.       s->disabled=1; /* consider this slave unusable */
  461.     }
  462.     
  463.     timeout_set(s); 
  464.     /* move the chunk from the head to the tail of the queue */
  465.     queue_delete(c);
  466.     queue_add(&wf->to_draw, c);
  467.   }
  468. }
  469.  
  470.  
  471. /* Handle a reply from a slave */
  472.  
  473. static void 
  474. handle_reply_msg(wf, msg) 
  475.      wf_state *wf;
  476.      Message *msg;
  477. { char *client;
  478.   slave *s;
  479.   chunk *c;
  480.   
  481.   unsigned int pid=msg->reply.id.pid;
  482.   unsigned int seqno=msg->reply.id.seq;
  483.   unsigned int chunkno=msg->reply.id.chunk_no;
  484.   unsigned int slaveno=msg->reply.id.slave_no;
  485.   int late;
  486.   
  487.   if(pid != wf->pid) return;
  488.   if(slaveno >= wf->n_slaves) return;
  489.   s= wf->slaves[slaveno];
  490.   s->n_packets++;
  491.   if(seqno != wf->sequence) 
  492.   { s->n_late_packets++;
  493.     return;
  494.   }
  495.   if(chunkno >= wf->n_chunks) return;
  496.   c= wf->chunks[chunkno];
  497.   
  498.   timeout_unset(s);
  499.  
  500.   client=c->client;
  501.  
  502.   /* the chunk is too late if the client has gone away or it has */
  503.   /* been drawn already */
  504.   late = (!client || c->drawn);
  505.  
  506.   if(late) /* ignore the message if the chunk arrived too late */
  507.   { s->n_late_packets++;
  508.   }
  509.   else
  510.   { queue_delete(c);
  511.     queue_add(&wf->drawn, c);
  512.     c->drawn=1;
  513.   }
  514.   /* Put the slave to work again, or stop all slaves if there is */
  515.   /* no more work */
  516.   whip_slave(wf, s);
  517.   
  518.   if(!late)
  519.   { wf_draw(client, c->client_data, (char *) &(msg->reply.data));
  520.     s->mi_count += ntohl(msg->reply.mi_count);
  521.   }
  522. }
  523.  
  524.  
  525. /* This function is called by the Ms widget for each chunk it */
  526. /* wants to be calculated */
  527.  
  528. void 
  529. wf_dispatch_chunk(wf, client, client_data, client_datalen,
  530.           slave_data, slave_datalen)
  531.      wf_state *wf;
  532.      char *client;
  533.      char *client_data;
  534.      unsigned int client_datalen;
  535.      char *slave_data;
  536.      unsigned int slave_datalen;
  537. { int chunkno=wf->n_chunks++;
  538.   chunk *c=(chunk *) wf_alloc(sizeof(chunk));
  539.   c->client_data=(char *) wf_alloc(client_datalen);
  540.   bcopy(client_data, c->client_data, client_datalen);
  541.   c->slave_data=(char *) wf_alloc(slave_datalen);
  542.   bcopy(slave_data, c->slave_data, slave_datalen);
  543.   c->slave_datalen=slave_datalen;
  544.   c->client=client;
  545.   c->drawn=0;
  546.   c->no=chunkno;
  547.   /* grow the chunk index if necessary */
  548.   if(chunkno >= wf->max_chunks)
  549.   { wf->max_chunks *= 2;
  550.     wf->chunks=(chunk **) wf_realloc((char *) wf->chunks,
  551.                        wf->max_chunks*sizeof(chunk *));
  552.   }
  553.   wf->chunks[chunkno]=c;
  554.   queue_add(&wf->to_draw, c);
  555. }
  556.  
  557.  
  558. /* Make sure all the slaves are put to work */
  559.  
  560. void wf_restart(wf)
  561.   wf_state *wf;
  562. { int i;
  563.   for(i=0; i < wf->n_slaves; i++)
  564.   { whip_slave(wf, wf->slaves[i]);
  565.   }
  566. }
  567.  
  568.  
  569. /* Handle the situation of a client aborting prematurely */
  570.  
  571. void wf_client_died(wf, cli)
  572.   wf_state *wf; char *cli;
  573. { chunk *c;
  574.   chunk *next_c;
  575.   /* Remove all chunks of the dead widget from the work queue */
  576.   /* and nullify their widget pointer fields so that late packets */
  577.   /* won't reference the nonexistent widget */
  578.   for(c=wf->to_draw.next; c != &wf->to_draw; c=next_c)
  579.   { /* need to use a temporary variable because */
  580.     /* relinking the chunk fouls up the .next field */
  581.     next_c=c->next; 
  582.     if(c->client == cli)
  583.     { c->client = NULL;
  584.       queue_delete(c);
  585.       queue_add(&wf->drawn, c);
  586.     }
  587.   }
  588. }
  589.  
  590.  
  591. /* Print performance statistics */
  592.  
  593. void wf_print_stats(wf, f)
  594.   wf_state *wf; FILE *f;
  595. { int i;
  596.   int active=0;
  597.   unsigned long mi_tot=0;
  598.   fprintf(f, "\n%-22s %10s %10s %10s %10s\n",
  599.      "Host", "iterations", "packets", "timeouts", "late");
  600.   for(i=0; i<wf->n_slaves; i++)
  601.   { slave *s= wf->slaves[i];
  602.     fprintf(f, "%-22s %10lu %10u %10u %10u\n",
  603.        s->name_string, 
  604.        s->mi_count, s->n_packets, s->n_timeouts, s->n_late_packets);
  605.     if(s->mi_count)
  606.       active++;
  607.     mi_tot += s->mi_count;
  608.   }
  609.   fprintf(f, "%d servers, %d active, %lu iterations total\n",
  610.      wf->n_slaves, active, mi_tot);
  611.   fflush(f);
  612. }
  613.  
  614.  
  615. /* Make the socket available */
  616. int wf_socket(wf)
  617.   wf_state *wf;
  618. { return(wf->my_socket);
  619. }
  620.  
  621. /* Make the maximum message size available */
  622. unsigned
  623. wf_max_message_size()
  624. { return(DATAGRAM_BYTES-sizeof(ReplyHeader));
  625. }
  626.  
  627. #ifndef OLD_TIMEOUT
  628. void wf_tick(wf)
  629.      wf_state *wf;
  630. { int i;
  631.   unsigned n_slaves = wf->n_slaves;
  632.   long now = time((long *) 0);
  633.   for(i=0; i<n_slaves; i++)
  634.   { slave *s = s=wf->slaves[i];
  635.     if(s->has_timeout && s->timeout_at <= now)
  636.     { wf_timed_out((char *) s);
  637.     }
  638.   }
  639. }
  640. #endif
  641.